package main

import (
	"context"
	"fmt"
	"os"
	"reflect"
	"sync/atomic"
	"time"

	"gitee.com/simonxie979/skymeta"
	"gitee.com/simonxie979/skymeta/protocol"

	"github.com/gogo/protobuf/proto"
)

type Service struct {
	ctx context.Context

	serviceID   uint64
	serviceName string
	count       uint64
	msgCount    uint64
}

func (s *Service) Init(ctx context.Context, serviceID uint64) error {
	s.ctx = ctx
	s.serviceID = serviceID
	s.serviceName = os.Getenv("service_name")
	s.count = 0
	s.msgCount = 0

	// go s.Send()
	go s.Call()
	go s.msgCountPrint()

	return nil
}

func (s *Service) msgCountPrint() {
	ticker := time.NewTicker(time.Second)
	for {
		select {
		case <-s.ctx.Done():
			return
		case <-ticker.C:
			skymeta.Warnf("Service", "MsgCount: %d", atomic.LoadUint64(&s.msgCount))
			// skymeta.Warnf("Service", "service list: %v", skymeta.ListService())
		}
	}
}

func (s *Service) GetServiceID() uint64 {
	return s.serviceID
}

func (s *Service) GetServiceName() string {
	return s.serviceName
}

func (s *Service) Shutdown() {}

func (s *Service) OnMessage(session, source uint64, msgName string, msgBody []byte) {
	atomic.AddUint64(&s.msgCount, 1)

	msgType := proto.MessageType(msgName)
	if msgType == nil {
		skymeta.Errorf("Service", "invalid message: %v, %v, %v, %v", session, source, msgName, string(msgBody))
		return
	}

	obj := reflect.New(msgType.Elem()).Interface().(proto.Message)
	if err := proto.Unmarshal(msgBody, obj); err != nil {
		skymeta.Errorf("Service", "unmarshal message failure. err: %v. args: %v, %v, %v, %v", err, session, source, msgName, string(msgBody))
		return
	}

	if session != 0 {
		respMsg := &protocol.RespMessage{
			Data: fmt.Sprintf("count: %v", s.msgCount),
		}

		s.RespMsg(source, session, respMsg)
	}

	// skymeta.Debugf("Service", "recive msg: %v, %v, %v, %+v", session, source, msgName, obj)
}

func (s *Service) SendMsg(destination uint64, msg proto.Message) {
	data, err := proto.Marshal(msg)
	if err != nil {
		return
	}

	skymeta.Send(s.serviceID, destination, proto.MessageName(msg), data)
}

func (s *Service) CallMsg(destination uint64, msg proto.Message) {
	data, err := proto.Marshal(msg)
	if err != nil {
		return
	}
	_, _, respName, respData, err := skymeta.Call(s.serviceID, destination, proto.MessageName(msg), data)
	if err != nil {
		skymeta.Errorf("Service", "Call failure. err: %v", err)
		return
	}

	msgType := proto.MessageType(respName)
	if msgType == nil {
		skymeta.Errorf("Service", "invalid message: %v, %v", respName, string(respData))
		return
	}

	obj := reflect.New(msgType.Elem()).Interface().(proto.Message)
	if err := proto.Unmarshal(respData, obj); err != nil {
		skymeta.Errorf("Service", "unmarshal message failure. err: %v. args: %v, %v", err, respName, string(respData))
		return
	}

	skymeta.Warnf("Service", "Resp: %v, %#v", respName, obj)
}

func (s *Service) RespMsg(destination, sequence uint64, msg proto.Message) {
	data, err := proto.Marshal(msg)
	if err == nil {
		return
	}

	skymeta.Response(s.serviceID, destination, sequence, proto.MessageName(msg), data)
}

func (s *Service) Send() {
	time.Sleep(time.Second * 3)

	ticker := time.NewTicker(time.Microsecond)
	defer ticker.Stop()

	sendMsg := &protocol.SendMessage{
		Data: "Send Message",
	}

	for {
		select {
		case <-s.ctx.Done():
			return
		case <-ticker.C:
			s.count++

			for _, srvID := range skymeta.QueryService("service1") {
				if srvID != s.serviceID {
					// s.SendMsg(srvID, sendMsg)

					for i := 0; i < 100000; i++ {
						s.SendMsg(srvID, sendMsg)
					}
					skymeta.Warnf("Service", "Send finish")
				}
			}
		}
	}
}

func (s *Service) Call() {
	time.Sleep(time.Second * 3)

	ticker := time.NewTicker(time.Second)
	defer ticker.Stop()

	callMsg := &protocol.CallMessage{
		Data: "Call Message",
	}

	for {
		select {
		case <-s.ctx.Done():
			return
		case <-ticker.C:
			s.count++

			for _, srvID := range skymeta.QueryService("service1") {
				if srvID != s.serviceID {
					s.CallMsg(srvID, callMsg)

					skymeta.Warnf("Service", "Call finish")
				}
			}
		}
	}
}
